package com.sg.common.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import com.sg.common.db.DbUtil;

/**
 * adb------>bdb adb<------bdb
 * 
 * 适用于相同表结构间的双向同步 select、update、insert的属性顺序需要一致
 * 必须有更新时间先后判断的基准属性
 * 
 */
public class SyncTwoWay {
	public SyncTwoWay(String aConStr, String bConStr, String selSql,
			String insertSql, String updateSql) {
		super();
		this.aConStr = aConStr;
		this.bConStr = bConStr;
		this.selSql = selSql;
		this.insertSql = insertSql;
		this.updateSql = updateSql;
	}



	private Logger log = Logger.getLogger(getClass());
	
	String aConStr;
	String bConStr;
	/**
	 * 获取两个数据源的通用selectsql，id必须为第一个（数据比较基准）,updateTime(比较时间基准)为第二个
	 */
	String selSql;
	/**
	 * insert into db (id,updateTime,b,xx) values (???)
	 */
	String insertSql;
	/**
	 * update updateTime=?,xxx=?,xx where id=?
	 */
	String updateSql;
	
	

	public void sync() throws SQLException{
		final Connection acon = DbUtil.cusCon(aConStr);
		final Connection bcon = DbUtil.cusCon(bConStr);
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = acon.prepareStatement(selSql);
			rs = ps.executeQuery();
			ResultSetMetaData metaData = rs.getMetaData();
			final int columnCount = metaData.getColumnCount();
			Map<String,String[]> asl = new HashMap<String,String[]>();
			while (rs.next()) {
				int i = 0;
				String[] ns = new String[columnCount];
				while (i < columnCount) {
					ns[i] = rs.getString(++i);
				}
				asl.put(ns[0],ns);
			}
			ps = bcon.prepareStatement(selSql);
			rs = ps.executeQuery();
			Map<String,String[]> bsl = new HashMap<String,String[]>();
			while (rs.next()) {
				int i = 0;
				String[] ns = new String[columnCount];
				while (i < columnCount) {
					ns[i] = rs.getString(++i);
				}
				bsl.put(ns[0],ns);
			}
			final Map<String,String[]> fbsl=bsl;
			final Map<String,String[]> fasl=asl;
			Set<String> akeys = asl.keySet();
			final CountDownLatch count = new CountDownLatch(akeys.size());
			final ExecutorService executorService = Executors
					.newFixedThreadPool(50);
			for(final String akey:akeys){
				executorService.submit(new Runnable() {
					public void run() {
						PreparedStatement ups = null;
						PreparedStatement lps = null;
						try {
							String[] avalue = fasl.get(akey);
							if(fbsl.containsKey(akey)){
								String[] bvalue = fbsl.get(akey);
								//进行比较更新时间
								String atime = avalue[1];
								String btime = bvalue[1];
								boolean atEmpty = !StringUtils.isNotEmpty(atime);
								boolean btEmpty = !StringUtils.isNotEmpty(btime);
								if(atEmpty&&btEmpty){
									//donothing..
								}else if(!atEmpty&&btEmpty){
									SyncTwoWay.update(ups,bcon, columnCount, avalue,updateSql);
								}else if(atEmpty&&!btEmpty){
									SyncTwoWay.update(ups, acon, columnCount, bvalue,updateSql);
								}else if(!atEmpty&&!btEmpty){
									int res = atime.compareTo(btime);
									if(res==0){
										//donothing
									}else if(res==1){
										SyncTwoWay.update(ups,bcon, columnCount, avalue,updateSql);
									}else if(res==-1){
										SyncTwoWay.update(ups, acon, columnCount, bvalue,updateSql);
									}
								}
							}else{
								//b中无a则插入b
								SyncTwoWay.insert(lps,bcon, columnCount, avalue,insertSql);
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							// 运动员到达终点,count数减一
							count.countDown();
							try {
								if (ups != null) {
									ups.close();
								}
								if (lps != null) {
									lps.close();
								}
							} catch (SQLException e) {
								e.printStackTrace();
							}
						}
					}

					private void insert(PreparedStatement lps,
							final Connection bcon,
							final int columnCount, String[] avalue)
							throws SQLException {
						lps = bcon.prepareStatement(insertSql);
						int k = 1;
						while (k <= columnCount + 1) {
							lps.setString(k, avalue[k - 1]);
							k++;
						}
						lps.executeUpdate();
					}

					private void update(PreparedStatement ups,
							final Connection bcon,
							final int columnCount, String[] avalue)
							throws SQLException {
						ups = bcon.prepareStatement(updateSql);
						int j = 1;
						while (j < columnCount) {
							ups.setString(j, avalue[j++]);
						}
						ups.setString(j, avalue[0]);
						ups.executeUpdate();
					}
				});
			}
			long s = System.currentTimeMillis();
			log.info("a---->b, from " + s);

			count.await();
			long e = System.currentTimeMillis();
			log.info("a---->b, lasts " + (e - s));
			executorService.shutdown();
			
			
			Set<String> bkeys = bsl.keySet();
			final CountDownLatch bcount = new CountDownLatch(bkeys.size());
			final ExecutorService bexecutorService = Executors
					.newFixedThreadPool(50);
			for(final String bkey:bkeys){
				bexecutorService.submit(new Runnable() {
					public void run() {
						PreparedStatement lps = null;
						try {
							String[] bvalue = fbsl.get(bkey);
							if(fasl.containsKey(bkey)){
									//donothing..
							}else{
								//a中无b则插入a
								SyncTwoWay.insert(lps,acon, columnCount, bvalue,insertSql);
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							// 运动员到达终点,count数减一
							count.countDown();
							try {
								if (lps != null) {
									lps.close();
								}
							} catch (SQLException e) {
								e.printStackTrace();
							}
						}
					}
				});
			}
			long bs = System.currentTimeMillis();
			log.info("b---->a, from " + s);

			bcount.await();
			long be = System.currentTimeMillis();
			log.info("b---->a, lasts " + (be - bs));
			bexecutorService.shutdown();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	private static void insert(PreparedStatement lps,
			final Connection bcon,
			final int columnCount, String[] avalue,String insertSql)
			throws SQLException {
		lps = bcon.prepareStatement(insertSql);
		int k = 1;
		while (k <= columnCount + 1) {
			lps.setString(k, avalue[k - 1]);
			k++;
		}
		lps.executeUpdate();
	}

	private static void update(PreparedStatement ups,
			final Connection bcon,
			final int columnCount, String[] avalue,String updateSql)
			throws SQLException {
		ups = bcon.prepareStatement(updateSql);
		int j = 1;
		while (j < columnCount) {
			ups.setString(j, avalue[j++]);
		}
		ups.setString(j, avalue[0]);
		ups.executeUpdate();
	}
}
